package com.catmiao.spark.stream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.concurrent.duration.{Duration, SECONDS}

/**
 * @title: SparkStreaming01_WordCount
 * @projectName spark_study
 * @description: TODO
 * @author ChengMiao
 * @date 2024/3/25 00:31
 */
object SparkStreaming01_WordCount {

  def main(args: Array[String]): Unit = {


    // 创建环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    //  param1 : 环境配置，SparkConf
    //  param2 ： 采集周期【批量处理周期】
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 逻辑处理
    // 1. 读取端口数据
    val lines = ssc.socketTextStream("localhost", 9999)

    val words: DStream[String] = lines.flatMap(_.split(" "))
    val word: DStream[(String, Int)] = words.map((_, 1))

    val wordToCount = word.reduceByKey(_ + _)

    wordToCount.print()

    // 关闭环境
    // 由于sparkStreaming采集器是长期执行的任务，所以不能直接关闭
    // 如果main方法执行完毕，应用程序也会自动结束，所以不能让main方法执行完毕
    //  ssc.stop()

    // 1. 启动采集器
    ssc.start()
    // 2. 等待采集器的关闭
    ssc.awaitTermination()
  }

}
